/*
 * This file is open source software, licensed to you under the terms
 * of the Apache License, Version 2.0 (the "License").  See the NOTICE file
 * distributed with this work for additional information regarding copyright
 * ownership.  You may not use this file except in compliance with the License.
 *
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
/*
 * Copyright (C) 2014 Cloudius Systems, Ltd.
 */

#pragma once

#include <seastar/net/packet.hh>
#include <map>

namespace seastar {

namespace net {

template <typename Offset, typename Tag>
class packet_merger {
private:
    static uint64_t& linearizations_ref() {
        static thread_local uint64_t linearization_count;
        return linearization_count;
    }
public:
    std::map<Offset, packet> map;

    static uint64_t linearizations() {
        return linearizations_ref();
    }

    void merge(Offset offset, packet p) {
        bool insert = true;
        auto beg = offset;
        auto end = beg + p.len();
        // Fisrt, try to merge the packet with existing segment
        for (auto it = map.begin(); it != map.end();) {
            auto& seg_pkt = it->second;
            auto seg_beg = it->first;
            auto seg_end = seg_beg + seg_pkt.len();
            // There are 6 cases:
            if (seg_beg <= beg && end <= seg_end) {
                // 1) seg_beg beg end seg_end
                // We already have data in this packet
                return;
            } else if (beg <= seg_beg && seg_end <= end) {
                // 2) beg seg_beg seg_end end
                // The new segment contains more data than this old segment
                // Delete the old one, insert the new one
                it = map.erase(it);
                insert = true;
                break;
            } else if (beg < seg_beg && seg_beg <= end && end <= seg_end) {
                // 3) beg seg_beg end seg_end
                // Merge two segments, trim front of old segment
                auto trim = end - seg_beg;
                seg_pkt.trim_front(trim);
                p.append(std::move(seg_pkt));
                // Delete the old one, insert the new one
                it = map.erase(it);
                insert = true;
                break;
            } else if (seg_beg <= beg && beg <= seg_end && seg_end < end) {
                // 4) seg_beg beg seg_end end
                // Merge two segments, trim front of new segment
                auto trim = seg_end - beg;
                p.trim_front(trim);
                // Append new data to the old segment, keep the old segment
                seg_pkt.append(std::move(p));
                seg_pkt.linearize();
                ++linearizations_ref();
                insert = false;
                break;
            } else {
                // 5) beg end < seg_beg seg_end
                //   or
                // 6) seg_beg seg_end < beg end
                // Can not merge with this segment, keep looking
                it++;
                insert = true;
            }
        }

        if (insert) {
            p.linearize();
            ++linearizations_ref();
            map.emplace(beg, std::move(p));
        }

        // Second, merge adjacent segments after this packet has been merged,
        // becasue this packet might fill a "whole" and make two adjacent
        // segments mergable
        for (auto it = map.begin(); it != map.end();) {
            // The first segment
            auto& seg_pkt = it->second;
            auto seg_beg = it->first;
            auto seg_end = seg_beg + seg_pkt.len();

            // The second segment
            auto it_next = it;
            it_next++;
            if (it_next == map.end()) {
                break;
            }
            auto& p = it_next->second;
            auto beg = it_next->first;
            auto end = beg + p.len();

            // Merge the the second segment into first segment if possible
            if (seg_beg <= beg && beg <= seg_end && seg_end < end) {
                // Merge two segments, trim front of second segment
                auto trim = seg_end - beg;
                p.trim_front(trim);
                // Append new data to the first segment, keep the first segment
                seg_pkt.append(std::move(p));

                // Delete the second segment
                map.erase(it_next);

                // Keep merging this first segment with its new next packet
                // So we do not update the iterator: it
                continue;
            } else if (end <= seg_end) {
                // The first segment has all the data in the second segment
                // Delete the second segment
                map.erase(it_next);
                continue;
            } else if (seg_end < beg) {
                // Can not merge first segment with second segment
                it = it_next;
                continue;
            } else {
                // If we reach here, we have a bug with merge.
                std::abort();
                break;
            }
        }
    }
};

}

}
